#load csv file into oracle table using python
Explore tagged Tumblr posts
ocptechnology · 4 years ago
Text
How to load data from CSV file into oracle using SQL Loader
How to load data from CSV file into oracle using SQL Loader
Hi Friends, in this article, we will load data from CSV file into oracle using SQL loader. SQL Loader is a tool which is providing by oracle. Read: How to create a SEQUENCE step by step What is SQL Loader? SQL Loader helps us to load data from external files into tables of the oracle database. For more details Click Here. Load data in empty or non-empty table We can load data in the empty or…
Tumblr media
View On WordPress
0 notes
technovert · 4 years ago
Text
A DATA INTEGRATION APPROACH TO MAXIMIZE YOUR ROI
The data Integration approach adopted by many data integration projects relies on a set of premium tools leading to cash burnout with RoI less than the standard.
To overcome this and to maximize the RoI, we lay down a data integration approach that makes use of open-source tools over the premium to deliver better results and an even more confident return on the investment.  
Tumblr media
Adopt a two-stage data integration approach:
Part 1 explains the process of setting up technicals and part 2 covers the execution approach involving challenges faced and solutions to the same.
Part 1: Setting Up
The following are the widely relied data sources:
REST API Source with standard NoSQL JSON (with nested datasets)
REST API Source with full data schema in JSON
CSV Files in AWS S3
Relational Tables from Postgres DB
There are 2 different JSON types above in which the former is conventional, and the latter is here
Along with the data movement, it is necessary to facilitate Plug-n-Play architecture, Notifications, Audit data for Reporting, Un-burdened Intelligent scheduling, and setting up all the necessary instances.
The landing Data warehouse chosen was AWS Redshift which is a one-stop for the operational data stores (ODS) as well as facts & dimensions. As said, we completely relied on open-source tools over the tools from tech giants like Oracle, Microsoft, Informatica, Talend, etc.,  
The data integration was successful by leveraging Python, SQL, and Apache Airflow to do all the work. Use Python for Extraction; SQL to Load & Transform the data and Airflow to orchestrate the loads via python-based scheduler code. Below is the data flow architecture.
Data Flow Architecture
Part 2: Execution
The above data flow architecture gives a fair idea of how the data was integrated. The execution is explained in parallel with the challenges faced and how they were solved.
Challenges:
Plug-n-Play Model.  
Dealing with the nested data in JSON.
Intelligent Scheduling.
Code Maintainability for future enhancements.  
1. Plug-n-Play Model
To meet the changing business needs, the addition of columns or a datastore is obvious and if the business is doing great, expansion to different regions is apparent. The following aspects were made sure to ensure a continuous integration process.
A new column will not break the process.
A new data store should be added with minimal work by a non-technical user.
To bring down the time consumed for any new store addition(expansion) integration from days to hours.  
The same is achieved by using:
config table which is the heart of the process holding all the jobs needed to be executed, their last extract dates, and parameters for making the REST API call/extract data from RDBMS.
Re-usable python templates which are read-modified-executed based on the parameters from the config table.
Audit table for logging all the crucial events happening whilst integration.
Control table for mailing and Tableau report refreshes after the ELT process
By creating state-of-art DAGs which can generate DAGs(jobs) with configuration decided in the config table for that particular job.
Any new table which is being planned for the extraction or any new store being added as part of business expansion needs its entries into the config table.
The DAG Generator DAG run will build jobs for you in a snap which will be available in Airflow UI on the subsequent refresh within seconds, and the new jobs are executed on the next schedule along with existing jobs.
2. Dealing with the nested data in JSON.
It is a fact that No-SQL JSONS hold a great advantage from a storage and data redundancy perspective but add a lot of pain while reading the nested data out of the inner arrays.
The following approach is adopted to conquer the above problem:
Configured AWS Redshift Spectrum, with IAM Role and IAM Policy as needed to access AWS Glue Catalog and associating the same with AWS Redshift database cluster
Created external database, external schema & external tables in AWS Redshift database
Created AWS Redshift procedures with specific syntax to read data in the inner array
AWS Redshift was leveraged to parse the data directly from JSON residing in AWS S3 onto an external table (no loading is involved) in AWS Redshift which was further transformed to rows and columns as needed by relational tables.
3. Intelligent Scheduling
There were multiple scenarios in orchestration needs:
Time-based – Batch scheduling; MicroELTs ran to time & again within a day for short intervals.
Event-based – File drop in S3
For the batch scheduling, neither the jobs were run all in series (since it is going to be an underutilization of resources and a time-consuming process) nor in parallel as the workers in airflow will be overwhelmed.  
A certain number of jobs were automated to keep running asynchronously until all the processes were completed. By using a python routine to do intelligent scheduling. The code reads the set of jobs being executed as part of the current batch into a job execution/job config table and keeps running those four jobs until all the jobs are in a completed/failed state as per the below logical flow diagram.
Logical Flow Diagram
For Event-based triggering, a file would be dropped in S3 by an application, and the integration process will be triggered by reading this event and starts the loading process to a data warehouse.
The configuration is as follows:
CloudWatch event which will trigger a Lambda function which in turn makes an API call to trigger Airflow DAG
4. Code Maintainability for future enhancements
A Data Integration project is always collaborative work and maintaining the correct source code is of dire importance. Also, if a deployment goes wrong, the capability to roll back to the original version is necessary.
For projects which involve programming, it is necessary to have a version control mechanism. To have that version control mechanism, configure the GIT repository to hold the DAG files in Airflow with Kubernetes executor.
Take away:
This data integration approach is successful in completely removing the premium costs while decreasing the course of the project. All because of the reliance on open-source tech and utilizing them to the fullest.
By leveraging any ETL tool in the market, the project duration would be beyond 6 months as it requires building a job for each operational data store. The best-recommended option is using scripting in conjunction with any ETL tool to repetitively build jobs that would more or less fall/overlap with the way it is now executed.  
Talk to our Data Integration experts:
Looking for a one-stop location for all your integration needs? Our data integration experts can help you align your strategy or offer you a consultation to draw a roadmap that quickly turns your business data into actionable insights with a robust Data Integration approach and a framework tailored for your specs.
1 note · View note
theresawelchy · 6 years ago
Text
1.1 Billion Taxi Rides: 108-core ClickHouse Cluster
ClickHouse is an open source, columnar-oriented database. It has a sweet spot where 100s of analysts can query non-rolled-up / cubed data quickly, even when tens of billions of new records a day are introduced. The infrastructure costs supporting such a system can come under $100K / year, and potentially half of that if usage permits. Yandex Metrica's ClickHouse installation at one point had 10s of trillions of records. Beyond Yandex, ClickHouse has also seen success recently at Bloomberg and CloudFlare.
Two years ago I benchmarked the database using a single machine and it came out as the fastest free database software I'd seen complete the benchmark. Since then, they've continued to add features including support for Kafka, HDFS and ZStandard compression. Last year they added support for stacking compression methods so that delta-of-delta compression became possible. When compressing time series data, gauge values can compress well with delta encoding but counters will do better with delta-of-delta encoding. Good compression has been a key to ClickHouse's performance.
ClickHouse is made up of 170K lines of C++ code when excluding 3rd-party libraries and is one of the smaller distributed database codebases. For contrast, SQLite doesn't support distribution and has 235K lines of C code. As of this writing, 207 engineers have contributed to ClickHouse and the rate of commits has been accelerating for some time.
In March of 2017, ClickHouse began maintaining a CHANGELOG as an easy way to keep track of developments. They've also broken up the monolithic documentation file into a hierarchy of Markdown-based files. Issues and features for the software are tracked via GitHub and overall this software has become much more approachable in the past few years.
In this post I'm going to take a look at ClickHouse's clustered performance on AWS EC2 using 36-core CPUs and NVMe storage.
Launching an AWS EC2 Cluster
I'll be using three c5d.9xlarge EC2 instances for this post. They each contain 36 vCPUs, 72 GB of RAM, 900 GB of NVMe SSD storage and support 10 Gigabit networking. They cost $1.962 / hour each in eu-west-1 when launched on-demand. I'll be using Ubuntu Server 16.04 LTS for the operating system.
The firewall is setup so each machine can communicate between one another without restrictions but only my IPv4 address is white-listed to SSH into the cluster.
NVMe Storage, Up and Running
On each of the servers I'll create an EXT4-formatted file system on the NVMe storage for ClickHouse to work off of.
$ sudo mkfs -t ext4 /dev/nvme1n1 $ sudo mkdir /ch $ sudo mount /dev/nvme1n1 /ch
Once that's setup you can see its mount point and that 783 GB of capacity is available on each of the systems.
NAME MAJ:MIN RM SIZE RO TYPE MOUNTPOINT loop0 7:0 0 87.9M 1 loop /snap/core/5742 loop1 7:1 0 16.5M 1 loop /snap/amazon-ssm-agent/784 nvme0n1 259:1 0 8G 0 disk └─nvme0n1p1 259:2 0 8G 0 part / nvme1n1 259:0 0 838.2G 0 disk /ch
Filesystem Size Used Avail Use% Mounted on udev 35G 0 35G 0% /dev tmpfs 6.9G 8.8M 6.9G 1% /run /dev/nvme0n1p1 7.7G 967M 6.8G 13% / tmpfs 35G 0 35G 0% /dev/shm tmpfs 5.0M 0 5.0M 0% /run/lock tmpfs 35G 0 35G 0% /sys/fs/cgroup /dev/loop0 88M 88M 0 100% /snap/core/5742 /dev/loop1 17M 17M 0 100% /snap/amazon-ssm-agent/784 tmpfs 6.9G 0 6.9G 0% /run/user/1000 /dev/nvme1n1 825G 73M 783G 1% /ch
The dataset I'll be using in this benchmark is a data dump I've produced of 1.1 billion taxi trips conducted in New York City over a six year period. The Billion Taxi Rides in Redshift blog post goes into detail on how I put this dataset together. They're stored on AWS S3 so I'll configure the AWS CLI with my access and secret keys.
$ sudo apt update $ sudo apt install awscli $ aws configure
I'll set the client's concurrent requests limit to 100 so the files download quicker than they would with stock settings.
$ aws configure set \ default.s3.max_concurrent_requests \ 100
I'll download taxi ride dataset off of AWS S3 and store it on the NVMe drive on the first server. This dataset is ~104 GB when in GZIP-compressed, CSV format.
$ sudo mkdir -p /ch/csv $ sudo chown -R ubuntu /ch/csv $ aws s3 sync s3://<bucket>/csv /ch/csv
Installing ClickHouse
I'll first install a few software installation utilities for Java 8.
$ sudo apt install \ software-properties-common \ python-software-properties
I'll then install Oracle's Java 8 distribution as its needed to run Apache ZooKeeper, a prerequisite of a distributed ClickHouse setup.
$ sudo add-apt-repository ppa:webupd8team/java $ sudo apt update $ sudo apt install oracle-java8-installer
I'll then use Ubuntu's package management to install ClickHouse 18.16.1 and ZooKeeper on all three machines.
$ sudo apt-key adv \ --keyserver hkp://keyserver.ubuntu.com:80 \ --recv E0C56BD4 $ echo "deb http://repo.yandex.ru/clickhouse/deb/stable/ main/" | \ sudo tee /etc/apt/sources.list.d/clickhouse.list $ sudo apt-get update $ sudo apt install \ clickhouse-client \ clickhouse-server \ zookeeperd
I'll create a data directory for ClickHouse as well as some configuration overrides on all three servers.
$ sudo mkdir /ch/clickhouse $ sudo chown -R clickhouse /ch/clickhouse $ sudo mkdir -p /etc/clickhouse-server/conf.d $ sudo vi /etc/clickhouse-server/conf.d/taxis.conf
These are the configuration overrides I'll be using.
<?xml version="1.0"?> <yandex> <listen_host>0.0.0.0</listen_host> <path>/ch/clickhouse/</path> <remote_servers> <perftest_1shards_3replicas> <shard> <replica> <host>172.30.2.200</host> <port>9000</port> </replica> <replica> <host>172.30.2.214</host> <port>9000</port> </replica> <replica> <host>172.30.2.174</host> <port>9000</port> </replica> </shard> </perftest_1shards_3replicas> </remote_servers> <zookeeper-servers> <node> <host>172.30.2.200</host> <port>2181</port> </node> <node> <host>172.30.2.214</host> <port>2181</port> </node> <node> <host>172.30.2.174</host> <port>2181</port> </node> </zookeeper-servers> <macros> <shard>01</shard> <replica>01</replica> </macros> </yandex>
I'll then launch ZooKeeper and the ClickHouse Server on all three machines.
$ sudo /etc/init.d/zookeeper start $ sudo service clickhouse-server start
Loading Data into ClickHouse
On the first server I'll create a trips table that will hold the taxi trips dataset using the Log engine.
$ clickhouse-client --host=0.0.0.0
CREATE TABLE trips ( trip_id UInt32, vendor_id String, pickup_datetime DateTime, dropoff_datetime Nullable(DateTime), store_and_fwd_flag Nullable(FixedString(1)), rate_code_id Nullable(UInt8), pickup_longitude Nullable(Float64), pickup_latitude Nullable(Float64), dropoff_longitude Nullable(Float64), dropoff_latitude Nullable(Float64), passenger_count Nullable(UInt8), trip_distance Nullable(Float64), fare_amount Nullable(Float32), extra Nullable(Float32), mta_tax Nullable(Float32), tip_amount Nullable(Float32), tolls_amount Nullable(Float32), ehail_fee Nullable(Float32), improvement_surcharge Nullable(Float32), total_amount Nullable(Float32), payment_type Nullable(String), trip_type Nullable(UInt8), pickup Nullable(String), dropoff Nullable(String), cab_type Nullable(String), precipitation Nullable(Int8), snow_depth Nullable(Int8), snowfall Nullable(Int8), max_temperature Nullable(Int8), min_temperature Nullable(Int8), average_wind_speed Nullable(Int8), pickup_nyct2010_gid Nullable(Int8), pickup_ctlabel Nullable(String), pickup_borocode Nullable(Int8), pickup_boroname Nullable(String), pickup_ct2010 Nullable(String), pickup_boroct2010 Nullable(String), pickup_cdeligibil Nullable(FixedString(1)), pickup_ntacode Nullable(String), pickup_ntaname Nullable(String), pickup_puma Nullable(String), dropoff_nyct2010_gid Nullable(UInt8), dropoff_ctlabel Nullable(String), dropoff_borocode Nullable(UInt8), dropoff_boroname Nullable(String), dropoff_ct2010 Nullable(String), dropoff_boroct2010 Nullable(String), dropoff_cdeligibil Nullable(String), dropoff_ntacode Nullable(String), dropoff_ntaname Nullable(String), dropoff_puma Nullable(String) ) ENGINE = Log;
I'll then decompress and load each of the CSV files into the trips table. The following completed in 55 minutes and 10 seconds. The data directory was 134 GB in size following this operation.
$ time (for FILENAME in /ch/csv/trips_x*.csv.gz; do gunzip -c $FILENAME | \ clickhouse-client \ --host=0.0.0.0 \ --query="INSERT INTO trips FORMAT CSV" done)
The import rate was 155 MB/s of uncompressed CSV content. I suspect this was due to a bottleneck with GZIP decompression. It might have been quicker to decompress all the gzip files in parallel using xargs and then load in the decompressed data. Below is what glances was reporting during the CSV import process.
$ sudo apt install glances $ sudo glances
ip-172-30-2-200 (Ubuntu 16.04 64bit / Linux 4.4.0-1072-aws) Uptime: 0:11:42 CPU 8.2% nice: 0.0% LOAD 36-core MEM 9.8% active: 5.20G SWAP 0.0% user: 6.0% irq: 0.0% 1 min: 2.24 total: 68.7G inactive: 61.0G total: 0 system: 0.9% iowait: 1.3% 5 min: 1.83 used: 6.71G buffers: 66.4M used: 0 idle: 91.8% steal: 0.0% 15 min: 1.01 free: 62.0G cached: 61.6G free: 0 NETWORK Rx/s Tx/s TASKS 370 (507 thr), 2 run, 368 slp, 0 oth sorted automatically by cpu_percent, flat view ens5 136b 2Kb lo 343Mb 343Mb CPU% MEM% VIRT RES PID USER NI S TIME+ IOR/s IOW/s Command 100.4 1.5 1.65G 1.06G 9909 ubuntu 0 S 1:01.33 0 0 clickhouse-client --host=0.0.0.0 --query=INSERT INTO trips FORMAT CSV DISK I/O R/s W/s 85.1 0.0 4.65M 708K 9908 ubuntu 0 R 0:50.60 32M 0 gzip -d -c /ch/csv/trips_xac.csv.gz loop0 0 0 54.9 5.1 8.14G 3.49G 8091 clickhous 0 S 1:44.23 0 45M /usr/bin/clickhouse-server --config=/etc/clickhouse-server/config.xml loop1 0 0 4.5 0.0 0 0 319 root 0 S 0:07.50 1K 0 kworker/u72:2 nvme0n1 0 3K 2.3 0.0 91.1M 28.9M 9912 root 0 R 0:01.56 0 0 /usr/bin/python3 /usr/bin/glances nvme0n1p1 0 3K 0.3 0.0 0 0 960 root -20 S 0:00.10 0 0 kworker/28:1H nvme1n1 32.1M 495M 0.3 0.0 0 0 1058 root -20 S 0:00.90 0 0 kworker/23:1H
I'll first free up some space on the NVMe drive by removing the source CSV files before continuing.
Converting into Columnar Form
ClickHouse's Log engine will store data in a row-centric format. In order to query the data faster I'll convert it into a columnar-centric format using the MergeTree engine.
$ clickhouse-client --host=0.0.0.0
The following completed in 43 minutes and 56 seconds. The data directory was 347 GB in size following this operation.
CREATE TABLE trips_mergetree ENGINE = MergeTree(pickup_date, pickup_datetime, 8192) AS SELECT trip_id, CAST(vendor_id AS Enum8('1' = 1, '2' = 2, 'CMT' = 3, 'VTS' = 4, 'DDS' = 5, 'B02512' = 10, 'B02598' = 11, 'B02617' = 12, 'B02682' = 13, 'B02764' = 14)) AS vendor_id, toDate(pickup_datetime) AS pickup_date, ifNull(pickup_datetime, toDateTime(0)) AS pickup_datetime, toDate(dropoff_datetime) AS dropoff_date, ifNull(dropoff_datetime, toDateTime(0)) AS dropoff_datetime, assumeNotNull(store_and_fwd_flag) AS store_and_fwd_flag, assumeNotNull(rate_code_id) AS rate_code_id, assumeNotNull(pickup_longitude) AS pickup_longitude, assumeNotNull(pickup_latitude) AS pickup_latitude, assumeNotNull(dropoff_longitude) AS dropoff_longitude, assumeNotNull(dropoff_latitude) AS dropoff_latitude, assumeNotNull(passenger_count) AS passenger_count, assumeNotNull(trip_distance) AS trip_distance, assumeNotNull(fare_amount) AS fare_amount, assumeNotNull(extra) AS extra, assumeNotNull(mta_tax) AS mta_tax, assumeNotNull(tip_amount) AS tip_amount, assumeNotNull(tolls_amount) AS tolls_amount, assumeNotNull(ehail_fee) AS ehail_fee, assumeNotNull(improvement_surcharge) AS improvement_surcharge, assumeNotNull(total_amount) AS total_amount, assumeNotNull(payment_type) AS payment_type_, assumeNotNull(trip_type) AS trip_type, pickup AS pickup, pickup AS dropoff, CAST(assumeNotNull(cab_type) AS Enum8('yellow' = 1, 'green' = 2)) AS cab_type, precipitation AS precipitation, snow_depth AS snow_depth, snowfall AS snowfall, max_temperature AS max_temperature, min_temperature AS min_temperature, average_wind_speed AS average_wind_speed, pickup_nyct2010_gid AS pickup_nyct2010_gid, pickup_ctlabel AS pickup_ctlabel, pickup_borocode AS pickup_borocode, pickup_boroname AS pickup_boroname, pickup_ct2010 AS pickup_ct2010, pickup_boroct2010 AS pickup_boroct2010, pickup_cdeligibil AS pickup_cdeligibil, pickup_ntacode AS pickup_ntacode, pickup_ntaname AS pickup_ntaname, pickup_puma AS pickup_puma, dropoff_nyct2010_gid AS dropoff_nyct2010_gid, dropoff_ctlabel AS dropoff_ctlabel, dropoff_borocode AS dropoff_borocode, dropoff_boroname AS dropoff_boroname, dropoff_ct2010 AS dropoff_ct2010, dropoff_boroct2010 AS dropoff_boroct2010, dropoff_cdeligibil AS dropoff_cdeligibil, dropoff_ntacode AS dropoff_ntacode, dropoff_ntaname AS dropoff_ntaname, dropoff_puma AS dropoff_puma FROM trips;
This is what glances looked like during the operation:
ip-172-30-2-200 (Ubuntu 16.04 64bit / Linux 4.4.0-1072-aws) Uptime: 1:06:09 CPU 10.3% nice: 0.0% LOAD 36-core MEM 16.1% active: 13.3G SWAP 0.0% user: 7.9% irq: 0.0% 1 min: 1.87 total: 68.7G inactive: 52.8G total: 0 system: 1.6% iowait: 0.8% 5 min: 1.76 used: 11.1G buffers: 71.8M used: 0 idle: 89.7% steal: 0.0% 15 min: 1.95 free: 57.6G cached: 57.2G free: 0 NETWORK Rx/s Tx/s TASKS 367 (523 thr), 1 run, 366 slp, 0 oth sorted automatically by cpu_percent, flat view ens5 1Kb 8Kb lo 2Kb 2Kb CPU% MEM% VIRT RES PID USER NI S TIME+ IOR/s IOW/s Command 241.9 12.8 20.7G 8.78G 8091 clickhous 0 S 30:36.73 34M 125M /usr/bin/clickhouse-server --config=/etc/clickhouse-server/config.xml DISK I/O R/s W/s 2.6 0.0 90.4M 28.3M 9948 root 0 R 1:18.53 0 0 /usr/bin/python3 /usr/bin/glances loop0 0 0 1.3 0.0 0 0 203 root 0 S 0:09.82 0 0 kswapd0 loop1 0 0 0.3 0.1 315M 61.3M 15701 ubuntu 0 S 0:00.40 0 0 clickhouse-client --host=0.0.0.0 nvme0n1 0 3K 0.3 0.0 0 0 7 root 0 S 0:00.83 0 0 rcu_sched nvme0n1p1 0 3K 0.0 0.0 0 0 142 root 0 S 0:00.22 0 0 migration/27 nvme1n1 25.8M 330M 0.0 0.0 59.7M 1.79M 2764 ubuntu 0 S 0:00.00 0 0 (sd-pam)
In the last benchmark several columns were cast and re-computed. I found a number of those functions no longer worked properly on this dataset. In order to get around this I removed the offending functions and loaded in the data without casting into more granular data types.
Distributing Data Across the Cluster
I'll be distributing the data across all three nodes in the cluster. To start, I'll create the table below on all three machines.
$ clickhouse-client --host=0.0.0.0
CREATE TABLE trips_mergetree_third ( trip_id UInt32, vendor_id String, pickup_date Date, pickup_datetime DateTime, dropoff_date Date, dropoff_datetime Nullable(DateTime), store_and_fwd_flag Nullable(FixedString(1)), rate_code_id Nullable(UInt8), pickup_longitude Nullable(Float64), pickup_latitude Nullable(Float64), dropoff_longitude Nullable(Float64), dropoff_latitude Nullable(Float64), passenger_count Nullable(UInt8), trip_distance Nullable(Float64), fare_amount Nullable(Float32), extra Nullable(Float32), mta_tax Nullable(Float32), tip_amount Nullable(Float32), tolls_amount Nullable(Float32), ehail_fee Nullable(Float32), improvement_surcharge Nullable(Float32), total_amount Nullable(Float32), payment_type Nullable(String), trip_type Nullable(UInt8), pickup Nullable(String), dropoff Nullable(String), cab_type Nullable(String), precipitation Nullable(Int8), snow_depth Nullable(Int8), snowfall Nullable(Int8), max_temperature Nullable(Int8), min_temperature Nullable(Int8), average_wind_speed Nullable(Int8), pickup_nyct2010_gid Nullable(Int8), pickup_ctlabel Nullable(String), pickup_borocode Nullable(Int8), pickup_boroname Nullable(String), pickup_ct2010 Nullable(String), pickup_boroct2010 Nullable(String), pickup_cdeligibil Nullable(FixedString(1)), pickup_ntacode Nullable(String), pickup_ntaname Nullable(String), pickup_puma Nullable(String), dropoff_nyct2010_gid Nullable(UInt8), dropoff_ctlabel Nullable(String), dropoff_borocode Nullable(UInt8), dropoff_boroname Nullable(String), dropoff_ct2010 Nullable(String), dropoff_boroct2010 Nullable(String), dropoff_cdeligibil Nullable(String), dropoff_ntacode Nullable(String), dropoff_ntaname Nullable(String), dropoff_puma Nullable(String) ) ENGINE = MergeTree(pickup_date, pickup_datetime, 8192);
I'll then make sure the first server can see all three nodes in the cluster.
SELECT * FROM system.clusters WHERE cluster = 'perftest_1shards_3replicas' FORMAT Vertical;
Row 1: ────── cluster: perftest_1shards_3replicas shard_num: 1 shard_weight: 1 replica_num: 1 host_name: 172.30.2.200 host_address: 172.30.2.200 port: 9000 is_local: 1 user: default default_database: Row 2: ────── cluster: perftest_1shards_3replicas shard_num: 1 shard_weight: 1 replica_num: 2 host_name: 172.30.2.214 host_address: 172.30.2.214 port: 9000 is_local: 1 user: default default_database: Row 3: ────── cluster: perftest_1shards_3replicas shard_num: 1 shard_weight: 1 replica_num: 3 host_name: 172.30.2.174 host_address: 172.30.2.174 port: 9000 is_local: 1 user: default default_database:
I'll then define a new table on the first server that's based on the trips_mergetree_third schema and uses the Distributed engine.
CREATE TABLE trips_mergetree_x3 AS trips_mergetree_third ENGINE = Distributed(perftest_1shards_3replicas, default, trips_mergetree_third, rand());
I'll then copy the data out of the MergeTree-based table and onto all three servers. The following completed in 58 minutes and 52 seconds.
INSERT INTO trips_mergetree_x3 SELECT * FROM trips_mergetree;
Following the above operation I gave ClickHouse 15 minutes to recede from its storage high-water mark. The data directories ended up being 421 GB, 144 GB and 144 GB in size respectively on each of the three servers.
ClickHouse Cluster Benchmark
The following were the fastest times I saw after running each query multiple times on the trips_mergetree_x3 table.
$ clickhouse-client --host=0.0.0.0
The following completed in 2.502 seconds.
SELECT cab_type, count(*) FROM trips_mergetree_x3 GROUP BY cab_type;
The following completed in 1.880 seconds.
SELECT passenger_count, avg(total_amount) FROM trips_mergetree_x3 GROUP BY passenger_count;
The following completed in 1.609 seconds.
SELECT passenger_count, toYear(pickup_date) AS year, count(*) FROM trips_mergetree_x3 GROUP BY passenger_count, year;
The following completed in 2.681 seconds.
SELECT passenger_count, toYear(pickup_date) AS year, round(trip_distance) AS distance, count(*) FROM trips_mergetree_x3 GROUP BY passenger_count, year, distance ORDER BY year, count(*) DESC;
While running query 1 I could see the first server's CPU was at ~50% utilisation while the other machines remained somewhat idle. This is a snippet from Glances:
CPU% MEM% VIRT RES PID USER NI S TIME+ IOR/s IOW/s Command 1508.2 1.9 50.7G 1.32G 8091 clickhous 0 S 0:54.98 0 3K /usr/bin/clickhouse-server --config=/etc/clickhouse-server/config.xml
The above struck me as odd. I was expecting a single query to speed up as I scaled out horizontally. To add to that I was expecting each queries to take longer than their previous siblings.
I decided to run the same queries on the MergeTree-based table which sits solely on the first server.
ClickHouse Single-Node Benchmark
The following were the fastest times I saw after running each query multiple times on the trips_mergetree table.
The following completed in 0.241 seconds.
SELECT cab_type, count(*) FROM trips_mergetree GROUP BY cab_type;
The following completed in 0.826 seconds.
SELECT passenger_count, avg(total_amount) FROM trips_mergetree GROUP BY passenger_count;
The following completed in 1.209 seconds.
SELECT passenger_count, toYear(pickup_date) AS year, count(*) FROM trips_mergetree GROUP BY passenger_count, year;
The following completed in 1.781 seconds.
SELECT passenger_count, toYear(pickup_date) AS year, round(trip_distance) AS distance, count(*) FROM trips_mergetree GROUP BY passenger_count, year, distance ORDER BY year, count(*) DESC;
Thoughts on the Results
This is the first time a free CPU-based database has managed to out-perform a GPU-based database in my benchmarks. That GPU database has since undergone two revisions but nonetheless, the performance ClickHouse has found on a single node is very impressive.
That being said, there is an order of magnitude of overhead when running Query 1 on the distributed engine. I'm hoping I've missed something in my research for this post because it would be good to see query times drop as I add more nodes to the cluster.
I've come across many setups where a single query won't be able to consume an entire cluster's resources but can run well concurrently with other queries. Given the single-node performance I'd consider ClickHouse for this sort of workload.
It would be nice to see ClickHouse evolve in such a way that storage and compute could somehow be disconnected so that they could scale independently. The HDFS support that has been added in the last year could be a step towards this. On the compute side, if a single query can be sped up as more nodes are added to the cluster then the future for this software will be very bright.
Thank you for taking the time to read this post. I offer consulting, architecture and hands-on development services to clients in North America & Europe. If you'd like to discuss how my offerings can help your business please contact me via
LinkedIn
.
DataTau published first on DataTau
0 notes
bionicly-blog · 8 years ago
Text
Data Insights Case Study Part 0 - Question & Setup
Tumblr media
So I decided to see just how much insight you can glean from “data in the wild”. You know, data that hasn’t been cleaned (or cleaned enough) and no textbook is ever going to use that to teach folks about algorithms using such data because it’s going to take a couple years at the rate the data needs to be cleaned and transformed first. This is going to be a multi-part post on getting insight in such a real-world case. It’s going to be hard and annoying.
But you always start with a question first. The question I had from a friend was, “does better/more education for girls and young women lead to better economic performance for a given country”? This is a good question, and our intuitive answer is “yes”. But the question then becomes, how, and how much?
To answer that question...well you need data, and hopefully a lot of it. This is where Kaggle comes in. Specifically, this data set on World Development Indicators. While it isn’t by any means the biggest data set ever, it is quite huge compared to classroom data sets used to teach you data science or machine learning.
And because it’s real data, it doesn’t behave like you’d expect or want it to. More on that later. But first, given it’s (at the time of writing) 350+ MB worth of data, how do you even decide where to start?
To me, it’s a bit of an iterative cycle. You start with a hypothesis (more education for girls -> better economic performance) and try to identify likely variables that may help answer this question. There’s always caveats like cause and effect vs correlation, overfitting, etc. etc., but to me, always start simple by looking at things and making a list of variables you think may help you down the road. And then you can worry about the caveats later.
Tumblr media
But first:
1. This is a huge file (see a snapshot of indicators.csv above) for local processing (say, a Macbook Air). Excel ain’t gonna cut it. You’ll need to load this into a relational database. I chose Postgresql but really, for something like this you can choose whatever else you like, such as MySQL or Oracle...
2. Once you’ve setup the relational DB, the next step is to create the tables to load both the raw data (indicators.csv) and the metadata (country.csv). Just run “head indicators.csv” at the terminal to take a look at the names of the headers and first few rows of data, to get a sense of how the tables should be set up. Then run something similar to the command below to load the data into the pre-created tables:
copy indicators_raw FROM '/path/to/file/Indicators.csv' delimiter ',' CSV;
3. Check to make sure the data wasn’t loaded improperly or somehow corrupted (unlikely to happen but you don’t want to try data analysis on something that was wrong in the first place). Maybe do a simple count(*) or just spot check here and there.
4. Start writing Python scripts to help you manage the data exploration and wrangling. More on this in the next post, but a preview of where this is going is the following function, which is just a helper function to load and normalize data:
Tumblr media
0 notes